Skip to content

fix: Clean shutdown for Sink threaded server using threading.Event#325

Merged
BulkBeing merged 7 commits intomainfrom
sink-threaded-clean-shutdown
Mar 5, 2026
Merged

fix: Clean shutdown for Sink threaded server using threading.Event#325
BulkBeing merged 7 commits intomainfrom
sink-threaded-clean-shutdown

Conversation

@BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Mar 3, 2026

Similar to #323, but for threaded server

Also

  • handles the regular grpc stream close (on pod delete).
  • Fixes UDF exception stacktrace propagation

For this UDF:

start = time.monotonic()

def udsink_handler(datums: Iterator[Datum]) -> Responses:
    responses = Responses()
    for msg in datums:
        elapsed = time.monotonic() - start
        if elapsed > 30:
            raise Exception("30 seconds elapsed")
        responses.append(Response.as_success(msg.id))
    return responses

UDF logs:

kubectl logs -f udsink-pipeline-out-0-hwmyc -c udsink
INFO:pynumaflow._constants:Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 15:03:36 INFO     Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 15:03:36 INFO     GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
INFO:pynumaflow._constants:GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
2026-03-03 15:04:08 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
2026-03-03 15:04:08 CRITICAL Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
CRITICAL:pynumaflow._constants:Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 72, in SinkFn
    ret = cur_task.join()
          ^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 63, in join
    raise self._exception
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py", line 40, in run
    self._return = self._target(*self._args, **self._kwargs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 109, in _invoke_sink
    rspns = self.handler(request_queue.read_iterator())
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/example.py", line 17, in udsink_handler
    raise Exception("30 seconds elapsed")
Exception: 30 seconds elapsed
2026-03-03 15:04:08 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')
2026-03-03 15:04:08 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...
2026-03-03 15:04:08 CRITICAL Server exiting due to UDF error: 30 seconds elapsed
CRITICAL:pynumaflow._constants:Server exiting due to UDF error: 30 seconds elapsed

Numa:

{"timestamp":"2026-03-03T15:04:10.356463Z","level":"ERROR","message":"Error while writing messages","e":"Grpc(Status { code: Internal, message: \"UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\", details: b\"\\x08\\r\\x12IUDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\\x1a\\xc7\\x08\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\x9a\\x08\\x12\\x97\\x08Traceback (most recent call last):\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 72, in SinkFn\\n    ret = cur_task.join()\\n          ^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 63, in join\\n    raise self._exception\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 40, in run\\n    self._return = self._target(*self._args, **self._kwargs)\\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 109, in _invoke_sink\\n    rspns = self.handler(request_queue.read_iterator())\\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/example.py\\\", line 17, in udsink_handler\\n    raise Exception(\\\"30 seconds elapsed\\\")\\nException: 30 seconds elapsed\", source: None })","target":"numaflow_core::pipeline::forwarder::sink_forwarder"}
{"timestamp":"2026-03-03T15:04:10.356555Z","level":"INFO","message":"Forwarder task completed","result":"Err(Grpc(Status { code: Internal, message: \"UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\", details: b\"\\x08\\r\\x12IUDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\\x1a\\xc7\\x08\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\x9a\\x08\\x12\\x97\\x08Traceback (most recent call last):\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 72, in SinkFn\\n    ret = cur_task.join()\\n          ^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 63, in join\\n    raise self._exception\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 40, in run\\n    self._return = self._target(*self._args, **self._kwargs)\\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 109, in _invoke_sink\\n    rspns = self.handler(request_queue.read_iterator())\\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/example.py\\\", line 17, in udsink_handler\\n    raise Exception(\\\"30 seconds elapsed\\\")\\nException: 30 seconds elapsed\", source: None }))","target":"numaflow_core::pipeline::forwarder::sink_forwarder"}
{"timestamp":"2026-03-03T15:04:10.356616Z","level":"INFO","message":"Stopped the Lag-Reader Expose tasks","target":"numaflow_core::metrics"}
{"timestamp":"2026-03-03T15:04:10.356642Z","level":"ERROR","message":"Pipeline failed because of UDF failure","error":"Status { code: Internal, message: \"UDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\", details: b\"\\x08\\r\\x12IUDSinkError, UDF_EXECUTION_ERROR(udsink): Exception('30 seconds elapsed')\\x1a\\xc7\\x08\\n(type.googleapis.com/google.rpc.DebugInfo\\x12\\x9a\\x08\\x12\\x97\\x08Traceback (most recent call last):\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 72, in SinkFn\\n    ret = cur_task.join()\\n          ^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 63, in join\\n    raise self._exception\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/shared/thread_with_return.py\\\", line 40, in run\\n    self._return = self._target(*self._args, **self._kwargs)\\n                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py\\\", line 109, in _invoke_sink\\n    rspns = self.handler(request_queue.read_iterator())\\n            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\\n  File \\\"/opt/pysetup/examples/sink/log/example.py\\\", line 17, in udsink_handler\\n    raise Exception(\\\"30 seconds elapsed\\\")\\nException: 30 seconds elapsed\", source: None }","target":"numaflow_core"}
{"timestamp":"2026-03-03T15:04:10.357110Z","level":"INFO","message":"Gracefully Exiting...","target":"numaflow_core"}
{"timestamp":"2026-03-03T15:04:10.357184Z","level":"INFO","message":"Exited.","target":"numaflow"}

On regular pod delete (stream close), before the changes in this PR:

2026-03-03 14:10:55 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
2026-03-03 14:10:55 CRITICAL Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
CRITICAL:pynumaflow._constants:Traceback (most recent call last):
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/pynumaflow/sinker/servicer/sync_servicer.py", line 52, in SinkFn
    for d in request_iterator:
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 534, in _next
    request = self._look_for_request()
              ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 516, in _look_for_request
    _raise_rpc_error(self._state)
  File "/opt/pysetup/examples/sink/log/.venv/lib/python3.11/site-packages/grpc/_server.py", line 220, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
2026-03-03 14:10:55 CRITICAL UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
CRITICAL:pynumaflow._constants:UDSinkError, UDF_EXECUTION_ERROR(udsink): RpcError()
2026-03-03 14:10:55 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...
2026-03-03 14:10:55 CRITICAL Server exiting due to UDF error: 
CRITICAL:pynumaflow._constants:Server exiting due to UDF error: 

with the changes:

➜  kubectl logs -f udsink-pipeline-out-0-vcmtw
INFO:pynumaflow._constants:Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 14:24:23 INFO     Sync GRPC Sink listening on: unix:///var/run/numaflow/sink.sock with max threads: 4
2026-03-03 14:24:23 INFO     GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
INFO:pynumaflow._constants:GRPC Server listening on: unix:///var/run/numaflow/sink.sock 11
2026-03-03 14:24:42 WARNING  gRPC stream closed, shutting down the server.
WARNING:pynumaflow._constants:gRPC stream closed, shutting down the server.
2026-03-03 14:24:42 INFO     Shutdown signal received, stopping server gracefully...
INFO:pynumaflow._constants:Shutdown signal received, stopping server gracefully...

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 91.66667% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.24%. Comparing base (23bc5d0) to head (20d00d5).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
packages/pynumaflow/pynumaflow/sinker/server.py 25.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #325      +/-   ##
==========================================
- Coverage   94.46%   94.24%   -0.23%     
==========================================
  Files          66       66              
  Lines        3071     3092      +21     
  Branches      158      162       +4     
==========================================
+ Hits         2901     2914      +13     
- Misses        141      148       +7     
- Partials       29       30       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing
Copy link
Contributor Author

BulkBeing commented Mar 3, 2026

On main branch, the actual UDF exception is not being shown in the errors tab
Screenshot 2026-03-03 at 8 16 28 PM

This PR fixes that:
Screenshot 2026-03-03 at 8 28 13 PM

Verified with async Sink server as well:
Screenshot 2026-03-04 at 6 46 25 AM

@BulkBeing BulkBeing marked this pull request as ready for review March 3, 2026 15:05
@BulkBeing BulkBeing requested a review from yhl25 March 4, 2026 01:20
Copy link
Contributor

@kohlisid kohlisid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm~
In the tests just to confirm whats the behavior of the inflight messages on server.stop? Ideally we are sending EOF, so that should allow the other messages to get processed before stopping

@kohlisid
Copy link
Contributor

kohlisid commented Mar 4, 2026

Also @BulkBeing could you check about coverage here?

@BulkBeing
Copy link
Contributor Author

lgtm~ In the tests just to confirm whats the behavior of the inflight messages on server.stop? Ideally we are sending EOF, so that should allow the other messages to get processed before stopping

During a normal shutdown (no UDF exception, like pod deletion), the server waits for in-flight requests to complete. In the case of shutdown due to UDF exception, we already sent a internal server error in update_context_err. Also, UDF function has returned at this point (since it raised exception). Any requests in the queue are effectively dropped.

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
@BulkBeing BulkBeing merged commit a66cdda into main Mar 5, 2026
12 checks passed
@BulkBeing BulkBeing deleted the sink-threaded-clean-shutdown branch March 5, 2026 02:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants